-
Notifications
You must be signed in to change notification settings - Fork 13
feat: Metadata propagation in source, source transformer, mapper, sink #210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: vtiwari5 <[email protected]>
Signed-off-by: vtiwari5 <[email protected]>
Signed-off-by: vtiwari5 <[email protected]>
… transform, mapper and sinker Signed-off-by: Vaibhav Tiwari <[email protected]>
yhl25
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Use function overloading and introduce a method for constructing on success response using the
Datum. - Remove onSuccess function for proto and use a private method.
- Add an example(could be a follow-up PR).
Signed-off-by: Vaibhav Tiwari <[email protected]>
Signed-off-by: Vaibhav Tiwari <[email protected]>
| log.info("Writing to onSuccess sink: {}", datum.getId()); | ||
| // Build the onSuccess message using builder for changing values, keys or userMetadata | ||
| responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(), | ||
| Message.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Value is a mandatory field, lets stick to constructor now. We can add support for builders eventually.
| * @param message The message object to convert into the relevant proto object | ||
| * @return The converted proto object | ||
| */ | ||
| public static SinkOuterClass.SinkResponse.Result.Message toProto(Message message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need expose this to users, Can we make it package private or move to a dedicate private helper method?
| */ | ||
| public static Response responseOnSuccess(String id, SinkResponse.Result.Message onSuccessMessage) { | ||
| return new Response(id, false, null, false, false, null, true, onSuccessMessage); | ||
| public static Response responseOnSuccess(Datum datum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we already have Message.fromDatum. I think this can be avoided.
| @Test | ||
| public void testAddKV_ignoresNulls() { | ||
| UserMetadata metadata = new UserMetadata(); | ||
|
|
||
| metadata.addKV(null, "k", "v".getBytes()); | ||
| metadata.addKV("g", null, "v".getBytes()); | ||
| metadata.addKV("g", "k", null); | ||
|
|
||
| assertTrue(metadata.getGroups().isEmpty()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAddKV_defensiveCopy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge tests to cover different cases.
Fixes: #205
Notes
Change in OnSuccess message metadata
With the introduction of a shared
UserMetadataclass, theuserMetadatamember inMessageclass for OnSuccess message has been changed from:to
This is done so that the users can utilize the same UserMetadata for onSuccess message without conversion.
Furthermore, the new UserMetadata has been implemented in its current form since it allows parity with other SDKs (inner data holds Map of Map, instead of Map of KeyValueGroup) and also prevents the extra step of converting proto KeyValueGroup object to local KeyValueGroup class object and back.
Format of userMetadata and systemMetadata
Both
UserMetadataandSystemMetadataclasses have inner data in the format: